import torch
import torch.nn as nn
# from torch.nn import init
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
from torch.utils.data import DataLoader
import torchvision.models as models
import torch.backends.cudnn as cudnn
import torchvision
import torch.autograd as autograd
from PIL import Image
import imp
import os
import sys
import math
import time
import random
import shutil
# import cv2
import scipy.misc
from glob import glob
import sklearn
import logging
from time import time
from tqdm import tqdm
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
plt.style.use('bmh')
%matplotlib inline
if not torch.cuda.is_available():
print("SORRY: No CUDA device")
imageSize = 64
batchSize = 64
nz = 100
ngf = 64
ndf = 64
nc = 3
nd = 2
cuda = True
PATH = 'celeba/'
data = dset.ImageFolder(PATH,
transforms.Compose([
transforms.Scale(imageSize),
transforms.CenterCrop(imageSize),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
)
dataloader = DataLoader(data, batch_size=batchSize, shuffle=True)
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
m.weight.data.normal_(0.0, 0.02)
elif classname.find('BatchNorm') != -1:
m.weight.data.normal_(1.0, 0.02)
m.bias.data.fill_(0)
class _netG(nn.Module):
def __init__(self):
super(_netG, self).__init__()
self.ngpu = 1
self.main = nn.Sequential(
# input is Z, going into a convolution
nn.ConvTranspose2d( nz, ngf * 8, 4, 1, 0, bias=False),
nn.BatchNorm2d(ngf * 8),
nn.ReLU(True),
# state size. (ngf*8) x 4 x 4
nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf * 4),
nn.ReLU(True),
# state size. (ngf*4) x 8 x 8
nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf * 2),
nn.ReLU(True),
# state size. (ngf*2) x 16 x 16
nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf),
nn.ReLU(True),
# state size. (ngf) x 32 x 32
nn.ConvTranspose2d( ngf, nc, 4, 2, 1, bias=False),
nn.Tanh()
# state size. (nc) x 64 x 64
)
def forward(self, input):
if isinstance(input.data, torch.cuda.FloatTensor) and self.ngpu > 1:
output = nn.parallel.data_parallel(self.main, input, range(self.ngpu))
else:
output = self.main(input)
return output
netG = _netG()
netG.apply(weights_init)
netG
class _netD(nn.Module):
def __init__(self):
super(_netD, self).__init__()
self.ngpu = 1
self.main = nn.Sequential(
# input is (nc) x 64 x 64
nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf) x 32 x 32
nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 2),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*2) x 16 x 16
nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 4),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*4) x 8 x 8
nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 8),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*8) x 4 x 4
nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
nn.Sigmoid()
)
def forward(self, input):
if isinstance(input.data, torch.cuda.FloatTensor) and self.ngpu > 1:
output = nn.parallel.data_parallel(self.main, input, range(self.ngpu))
else:
output = self.main(input)
return output
return output.view(-1, 1)
netDs = [_netD() for _ in range(nd)]
[netD.apply(weights_init) for netD in netDs]
criterion = nn.BCELoss()
input = torch.FloatTensor(batchSize, 3, imageSize, imageSize)
noise = torch.FloatTensor(batchSize, nz, 1, 1)
fixed_noise = torch.FloatTensor(batchSize, nz, 1, 1).normal_(0, 1)
label = torch.FloatTensor(batchSize)
real_label = 1
fake_label = 0
if cuda:
[netD.cuda() for netD in netDs]
netG.cuda()
criterion.cuda()
input, label = input.cuda(), label.cuda()
noise, fixed_noise = noise.cuda(), fixed_noise.cuda()
input = Variable(input)
label = Variable(label)
noise = Variable(noise)
fixed_noise = Variable(fixed_noise)
lr = 0.0002
beta1 = 0.5
optimizerDs = [optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999)) for netD in netDs]
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))
niter = 2
tick = time()
losses = []
for epoch in range(niter):
for i, data in enumerate(dataloader):
############################
# (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
###########################
# train with real
for netD, optimizerD in zip(netDs, optimizerDs):
netD.zero_grad()
real_cpu, _ = data
batch_size = real_cpu.size(0)
input.data.resize_(real_cpu.size()).copy_(real_cpu)
label.data.resize_(batch_size).fill_(real_label)
output = netD(input)
errD_real = criterion(output, label)
errD_real.backward()
D_x = output.data.mean()
# train with fake
noise.data.resize_(batch_size, nz, 1, 1)
noise.data.normal_(0, 1)
fake = netG(noise)
label.data.fill_(fake_label)
output = netD(fake.detach())
errD_fake = criterion(output, label)
errD_fake.backward()
D_G_z1 = output.data.mean()
errD = errD_real + errD_fake
optimizerD.step()
############################
# (2) Update G network: maximize log(D(G(z)))
###########################
whichD = np.random.randint(0, nd)
netD = netDs[whichD]
netG.zero_grad()
label.data.fill_(real_label) # fake labels are real for generator cost
output = netD(fake)
errG = criterion(output, label)
errG.backward()
D_G_z2 = output.data.mean()
optimizerG.step()
losses.append((errD.data[0], errG.data[0]))
if i%500 == 0:
print('[{}/{}][{}/{}][{:.2f}] Loss_D: {:.2f} Loss_G: {:.2f} D(x): {:.2f} D(G(z)): {:.2f} / {:.2f}'.\
format(epoch, niter, i, len(dataloader), time() - tick, errD.data[0], errG.data[0], D_x, D_G_z1, D_G_z2))
if i%500 == 0:
fake = netG(fixed_noise).data
show(vutils.make_grid(fake, normalize=True).cpu())
def show(img, fs=(6,6)):
plt.figure(figsize = fs)
plt.imshow(np.transpose(img.numpy(), (1,2,0)))
plt.show()
fake = netG(fixed_noise).data
show(vutils.make_grid(fake[:25], normalize=True, nrow=5).cpu())